/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements.  See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License.  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package internal

import (
	"encoding/json"
	"github.com/apache/rocketmq-client-go/v2/rlog"
	"strings"
	"testing"

	. "github.com/smartystreets/goconvey/convey"
	"github.com/tidwall/gjson"

	"github.com/apache/rocketmq-client-go/v2/internal/utils"
	"github.com/apache/rocketmq-client-go/v2/primitive"
)

func TestHeartbeatData(t *testing.T) {
	Convey("test heatbeat json", t, func() {

		Convey("producerData set marshal", func() {
			pData := &producerData{
				GroupName: "group name",
			}
			pData2 := &producerData{
				GroupName: "group name 2",
			}
			set := utils.NewSet()
			set.Add(pData)
			set.Add(pData2)

			v, err := json.Marshal(set)
			So(err, ShouldBeNil)
			rlog.Info("Json Producer", map[string]interface{}{
				"result": string(v),
			})
		})

		Convey("producer heatbeat", func() {

			hbt := NewHeartbeatData("producer client id")
			p1 := &producerData{
				GroupName: "group name",
			}
			p2 := &producerData{
				GroupName: "group name 2",
			}

			hbt.ProducerDatas.Add(p1)
			hbt.ProducerDatas.Add(p2)

			v, err := json.Marshal(hbt)
			So(err, ShouldBeNil)
			rlog.Info("Json Producer", map[string]interface{}{
				"result": string(v),
			})
		})

		Convey("consumer heartbeat", func() {

			hbt := NewHeartbeatData("consumer client id")
			c1 := consumerData{
				GroupName: "consumer data 1",
			}
			c2 := consumerData{
				GroupName: "consumer data 2",
			}
			hbt.ConsumerDatas.Add(c1)
			hbt.ConsumerDatas.Add(c2)

			v, err := json.Marshal(hbt)
			So(err, ShouldBeNil)
			rlog.Info("Json Consumer", map[string]interface{}{
				"result": string(v),
			})
		})

		Convey("producer & consumer heartbeat", func() {

			hbt := NewHeartbeatData("consumer client id")

			p1 := &producerData{
				GroupName: "group name",
			}
			p2 := &producerData{
				GroupName: "group name 2",
			}

			hbt.ProducerDatas.Add(p1)
			hbt.ProducerDatas.Add(p2)

			c1 := consumerData{
				GroupName: "consumer data 1",
			}
			c2 := consumerData{
				GroupName: "consumer data 2",
			}
			hbt.ConsumerDatas.Add(c1)
			hbt.ConsumerDatas.Add(c2)

			v, err := json.Marshal(hbt)
			So(err, ShouldBeNil)
			rlog.Info("Json Producer and Consumer", map[string]interface{}{
				"result": string(v),
			})
		})
	})

}

func TestConsumerRunningInfo_MarshalJSON(t *testing.T) {
	Convey("test ConsumerRunningInfo MarshalJson", t, func() {
		props := map[string]string{
			"maxReconsumeTimes":             "-1",
			"unitMode":                      "false",
			"adjustThreadPoolNumsThreshold": "100000",
			"consumerGroup":                 "mq-client-go-test%GID_GO_TEST",
			"messageModel":                  "CLUSTERING",
			"suspendCurrentQueueTimeMillis": "1000",
			"pullThresholdSizeForTopic":     "-1",
			"pullThresholdSizeForQueue":     "100",
			"PROP_CLIENT_VERSION":           "V4_5_1",
			"consumeConcurrentlyMaxSpan":    "2000",
			"postSubscriptionWhenPull":      "false",
			"consumeTimestamp":              "20191127013617",
			"PROP_CONSUME_TYPE":             "CONSUME_PASSIVELY",
			"consumeTimeout":                "15",
			"consumeMessageBatchMaxSize":    "1",
			"PROP_THREADPOOL_CORE_SIZE":     "20",
			"pullInterval":                  "0",
			"pullThresholdForQueue":         "1000",
			"pullThresholdForTopic":         "-1",
			"consumeFromWhere":              "CONSUME_FROM_FIRST_OFFSET",
			"PROP_NAMESERVER_ADDR":          "mq-client-go-test.mq-internet-access.mq-internet.aliyuncs.com:80;",
			"pullBatchSize":                 "32",
			"consumeThreadMin":              "20",
			"PROP_CONSUMER_START_TIMESTAMP": "1574791577504",
			"consumeThreadMax":              "20",
			"subscription":                  "{}",
			"PROP_CONSUMEORDERLY":           "false",
		}
		subData := map[*SubscriptionData]bool{
			&SubscriptionData{
				ClassFilterMode: false,
				Codes:           utils.NewSet(),
				ExpType:         "TAG",
				SubString:       "*",
				SubVersion:      1574791579242,
				Tags:            utils.NewSet(),
				Topic:           "%RETRY%mq-client-go-test%GID_GO_TEST",
			}: true,
			&SubscriptionData{
				ClassFilterMode: true,
				Codes:           utils.NewSet(),
				ExpType:         "TAG",
				SubString:       "*",
				SubVersion:      1574791577523,
				Tags:            utils.NewSet(),
				Topic:           "mq-client-go-test%go-test",
			}: true,
		}
		statusTable := map[string]ConsumeStatus{
			"%RETRY%mq-client-go-test%GID_GO_TEST": {
				PullRT:            11.11,
				PullTPS:           22.22,
				ConsumeRT:         33.33,
				ConsumeOKTPS:      44.44,
				ConsumeFailedTPS:  55.55,
				ConsumeFailedMsgs: 666,
			},
			"mq-client-go-test%go-test": {
				PullRT:            123,
				PullTPS:           123,
				ConsumeRT:         123,
				ConsumeOKTPS:      123,
				ConsumeFailedTPS:  123,
				ConsumeFailedMsgs: 1234,
			},
		}
		mqTable := map[primitive.MessageQueue]ProcessQueueInfo{
			{
				Topic:      "%RETRY%mq-client-go-test%GID_GO_TEST",
				BrokerName: "qd7internet-01",
				QueueId:    0,
			}: {
				CommitOffset:            0,
				CachedMsgMinOffset:      0,
				CachedMsgMaxOffset:      0,
				CachedMsgCount:          0,
				CachedMsgSizeInMiB:      0,
				TransactionMsgMinOffset: 0,
				TransactionMsgMaxOffset: 0,
				TransactionMsgCount:     0,
				Locked:                  false,
				TryUnlockTimes:          0,
				LastLockTimestamp:       1574791579221,
				Dropped:                 false,
				LastPullTimestamp:       1574791579242,
				LastConsumeTimestamp:    1574791579221,
			},
			{
				Topic:      "%RETRY%mq-client-go-test%GID_GO_TEST",
				BrokerName: "qd7internet-01",
				QueueId:    1,
			}: {
				CommitOffset:            1,
				CachedMsgMinOffset:      2,
				CachedMsgMaxOffset:      3,
				CachedMsgCount:          4,
				CachedMsgSizeInMiB:      5,
				TransactionMsgMinOffset: 6,
				TransactionMsgMaxOffset: 7,
				TransactionMsgCount:     8,
				Locked:                  true,
				TryUnlockTimes:          9,
				LastLockTimestamp:       1574791579221,
				Dropped:                 false,
				LastPullTimestamp:       1574791579242,
				LastConsumeTimestamp:    1574791579221,
			},
		}
		info := ConsumerRunningInfo{
			Properties:       props,
			SubscriptionData: subData,
			StatusTable:      statusTable,
			MQTable:          mqTable,
		}
		data, err := info.Encode()
		So(err, ShouldBeNil)
		result := gjson.ParseBytes(data)
		Convey("test Properties fields", func() {
			r1 := result.Get("properties")
			So(r1.Exists(), ShouldBeTrue)
			m := r1.Map()
			So(len(m), ShouldEqual, 27)

			So(m["PROP_CLIENT_VERSION"], ShouldNotBeEmpty)
			So(m["PROP_CLIENT_VERSION"].String(), ShouldEqual, "V4_5_1")

			So(m["PROP_CONSUME_TYPE"], ShouldNotBeNil)
			So(m["PROP_CONSUME_TYPE"].String(), ShouldEqual, "CONSUME_PASSIVELY")

			So(m["PROP_THREADPOOL_CORE_SIZE"], ShouldNotBeNil)
			So(m["PROP_THREADPOOL_CORE_SIZE"].String(), ShouldEqual, "20")

			So(m["PROP_NAMESERVER_ADDR"], ShouldNotBeNil)
			So(m["PROP_NAMESERVER_ADDR"].String(), ShouldEqual, "mq-client-go-test.mq-internet-access.mq-internet.aliyuncs.com:80;")

			So(m["PROP_CONSUMER_START_TIMESTAMP"], ShouldNotBeNil)
			So(m["PROP_CONSUMER_START_TIMESTAMP"].String(), ShouldEqual, "1574791577504")

			So(m["PROP_CONSUMEORDERLY"], ShouldNotBeNil)
			So(m["PROP_CONSUMEORDERLY"].String(), ShouldEqual, "false")
		})
		Convey("test SubscriptionData fields", func() {
			r2 := result.Get("subscriptionSet")
			So(r2.Exists(), ShouldBeTrue)
			arr := r2.Array()
			So(len(arr), ShouldEqual, 2)

			m1 := arr[0].Map()
			So(len(m1), ShouldEqual, 7)
			So(m1["classFilterMode"].Bool(), ShouldEqual, false)
			So(len(m1["codes"].Array()), ShouldEqual, 0)
			So(m1["expressionType"].String(), ShouldEqual, "TAG")
			So(m1["subString"].String(), ShouldEqual, "*")
			So(m1["subVersion"].Int(), ShouldEqual, 1574791579242)
			So(len(m1["tags"].Array()), ShouldEqual, 0)
			So(m1["topic"].String(), ShouldEqual, "%RETRY%mq-client-go-test%GID_GO_TEST")

			m2 := arr[1].Map()
			So(len(m2), ShouldEqual, 7)
			So(m2["classFilterMode"].Bool(), ShouldEqual, true)
			So(len(m2["codes"].Array()), ShouldEqual, 0)
			So(m2["expressionType"].String(), ShouldEqual, "TAG")
			So(m2["subString"].String(), ShouldEqual, "*")
			So(m2["subVersion"].Int(), ShouldEqual, 1574791577523)
			So(len(m2["tags"].Array()), ShouldEqual, 0)
			So(m2["topic"].String(), ShouldEqual, "mq-client-go-test%go-test")
		})
		Convey("test StatusTable fields", func() {
			r3 := result.Get("statusTable")
			So(r3.Exists(), ShouldBeTrue)
			m := r3.Map()
			So(len(m), ShouldEqual, 2)

			status1 := m["mq-client-go-test%go-test"].Map()
			So(len(status1), ShouldEqual, 6)
			So(status1["pullRT"].Float(), ShouldEqual, 123)
			So(status1["pullTPS"].Float(), ShouldEqual, 123)
			So(status1["consumeRT"].Float(), ShouldEqual, 123)
			So(status1["consumeOKTPS"].Float(), ShouldEqual, 123)
			So(status1["consumeFailedTPS"].Float(), ShouldEqual, 123)
			So(status1["consumeFailedMsgs"].Int(), ShouldEqual, 1234)

			status2 := m["%RETRY%mq-client-go-test%GID_GO_TEST"].Map()
			So(len(status2), ShouldEqual, 6)
			So(status2["pullRT"].Float(), ShouldEqual, 11.11)
			So(status2["pullTPS"].Float(), ShouldEqual, 22.22)
			So(status2["consumeRT"].Float(), ShouldEqual, 33.33)
			So(status2["consumeOKTPS"].Float(), ShouldEqual, 44.44)
			So(status2["consumeFailedTPS"].Float(), ShouldEqual, 55.55)
			So(status2["consumeFailedMsgs"].Int(), ShouldEqual, 666)
		})
		Convey("test MQTable fields", func() {
			r4 := result.Get("mqTable")
			So(r4.Exists(), ShouldBeTrue)
			objNumbers := strings.Split(r4.String(), "},{")
			So(len(objNumbers), ShouldEqual, 2)

			obj1Str := objNumbers[0][1:len(objNumbers[0])] + "}"
			obj1KV := strings.Split(obj1Str, "}:{")
			So(len(obj1KV), ShouldEqual, 2)

			obj1 := gjson.Parse("{" + obj1KV[1][0:len(obj1KV[1])])
			So(obj1.Exists(), ShouldBeTrue)
			obj1M := obj1.Map()
			So(len(obj1M), ShouldEqual, 14)
			So(obj1M["commitOffset"].Int(), ShouldEqual, 0)
			So(obj1M["cachedMsgMinOffset"].Int(), ShouldEqual, 0)
			So(obj1M["cachedMsgMaxOffset"].Int(), ShouldEqual, 0)
			So(obj1M["cachedMsgCount"].Int(), ShouldEqual, 0)
			So(obj1M["cachedMsgSizeInMiB"].Int(), ShouldEqual, 0)
			So(obj1M["transactionMsgMinOffset"].Int(), ShouldEqual, 0)
			So(obj1M["transactionMsgMaxOffset"].Int(), ShouldEqual, 0)
			So(obj1M["transactionMsgCount"].Int(), ShouldEqual, 0)
			So(obj1M["locked"].Bool(), ShouldEqual, false)
			So(obj1M["tryUnlockTimes"].Int(), ShouldEqual, 0)
			So(obj1M["lastLockTimestamp"].Int(), ShouldEqual, 1574791579221)
			So(obj1M["dropped"].Bool(), ShouldEqual, false)
			So(obj1M["lastPullTimestamp"].Int(), ShouldEqual, 1574791579242)
			So(obj1M["lastConsumeTimestamp"].Int(), ShouldEqual, 1574791579221)

			obj2Str := "{" + objNumbers[1][0:len(objNumbers[1])-1]
			obj2KV := strings.Split(obj2Str, "}:{")
			So(len(obj2KV), ShouldEqual, 2)
			obj2 := gjson.Parse("{" + obj2KV[1][0:len(obj2KV[1])])
			So(obj2.Exists(), ShouldBeTrue)
			obj2M := obj2.Map()
			So(len(obj2M), ShouldEqual, 14)
			So(obj2M["commitOffset"].Int(), ShouldEqual, 1)
			So(obj2M["cachedMsgMinOffset"].Int(), ShouldEqual, 2)
			So(obj2M["cachedMsgMaxOffset"].Int(), ShouldEqual, 3)
			So(obj2M["cachedMsgCount"].Int(), ShouldEqual, 4)
			So(obj2M["cachedMsgSizeInMiB"].Int(), ShouldEqual, 5)
			So(obj2M["transactionMsgMinOffset"].Int(), ShouldEqual, 6)
			So(obj2M["transactionMsgMaxOffset"].Int(), ShouldEqual, 7)
			So(obj2M["transactionMsgCount"].Int(), ShouldEqual, 8)
			So(obj2M["locked"].Bool(), ShouldEqual, true)
			So(obj2M["tryUnlockTimes"].Int(), ShouldEqual, 9)
			So(obj2M["lastLockTimestamp"].Int(), ShouldEqual, 1574791579221)
			So(obj2M["dropped"].Bool(), ShouldEqual, false)
			So(obj2M["lastPullTimestamp"].Int(), ShouldEqual, 1574791579242)
			So(obj2M["lastConsumeTimestamp"].Int(), ShouldEqual, 1574791579221)
		})
	})
}

func TestConsumeMessageDirectlyResult_MarshalJSON(t *testing.T) {
	Convey("test ConsumeMessageDirectlyResult MarshalJson", t, func() {
		Convey("test consume success", func() {
			consumeMessageDirectlyResult := ConsumeMessageDirectlyResult{
				Order:          false,
				AutoCommit:     true,
				SpentTimeMills: 2,
			}
			consumeMessageDirectlyResult.ConsumeResult = ConsumeSuccess
			data, err := consumeMessageDirectlyResult.Encode()
			So(err, ShouldBeNil)
			rlog.Info("Json consumeMessageDirectlyResult", map[string]interface{}{
				"result": string(data),
			})
		})

		Convey("test consume timeout", func() {
			consumeResult := ConsumeMessageDirectlyResult{
				Order:          false,
				AutoCommit:     true,
				SpentTimeMills: 2,
			}
			consumeResult.ConsumeResult = ReturnNull
			data, err := consumeResult.Encode()
			So(err, ShouldBeNil)
			rlog.Info("Json consumeMessageDirectlyResult", map[string]interface{}{
				"result": string(data),
			})
		})

		Convey("test consume exception", func() {
			consumeResult := ConsumeMessageDirectlyResult{
				Order:          false,
				AutoCommit:     true,
				SpentTimeMills: 5,
			}
			consumeResult.ConsumeResult = ThrowException
			consumeResult.Remark = "Unknown Exception"
			data, err := consumeResult.Encode()
			So(err, ShouldBeNil)
			rlog.Info("Json consumeMessageDirectlyResult", map[string]interface{}{
				"result": string(data),
			})
		})
	})
}

func TestRestOffsetBody_MarshalJSON(t *testing.T) {
	Convey("test ResetOffset Body Decode gson json schema", t, func() {
		body := "{\"offsetTable\":[[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":5},23354233],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":4},23354245],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":7},23354203],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":6},23354312],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":1},23373517],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":0},23373350],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":3},23373424],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":2},23373382]]}"
		resetOffsetBody := new(ResetOffsetBody)
		resetOffsetBody.Decode([]byte(body))
		offsetTable := resetOffsetBody.OffsetTable
		So(offsetTable, ShouldNotBeNil)
		So(len(offsetTable), ShouldEqual, 8)
		messageQueue := primitive.MessageQueue{
			Topic:      "zx_tst",
			BrokerName: "tjwqtst-common-rocketmq-raft0",
			QueueId:    5,
		}
		So(offsetTable[messageQueue], ShouldEqual, 23354233)
	})

	Convey("test ResetOffset Body Decode fast json schema", t, func() {
		body := "{\"offsetTable\":{{\"brokerName\":\"RaftNode00\",\"queueId\":0,\"topic\":\"topicB\"}:11110,{\"brokerName\":\"RaftNode00\",\"queueId\":1,\"topic\":\"topicB\"}:0,{\"brokerName\":\"RaftNode00\",\"queueId\":2,\"topic\":\"topicB\"}:0,{\"brokerName\":\"RaftNode00\",\"queueId\":3,\"topic\":\"topicB\"}:0}}"
		resetOffsetBody := new(ResetOffsetBody)
		resetOffsetBody.Decode([]byte(body))
		offsetTable := resetOffsetBody.OffsetTable
		So(offsetTable, ShouldNotBeNil)
		So(len(offsetTable), ShouldEqual, 4)
		messageQueue := primitive.MessageQueue{
			Topic:      "topicB",
			BrokerName: "RaftNode00",
			QueueId:    0,
		}
		So(offsetTable[messageQueue], ShouldEqual, 11110)
	})

	Convey("test ResetOffset Body Decode fast json schema with one item", t, func() {
		body := "{\"offsetTable\":{{\"brokerName\":\"RaftNode00\",\"queueId\":0,\"topic\":\"topicB\"}:11110}}"
		resetOffsetBody := new(ResetOffsetBody)
		resetOffsetBody.Decode([]byte(body))
		offsetTable := resetOffsetBody.OffsetTable
		So(offsetTable, ShouldNotBeNil)
		So(len(offsetTable), ShouldEqual, 1)
		messageQueue := primitive.MessageQueue{
			Topic:      "topicB",
			BrokerName: "RaftNode00",
			QueueId:    0,
		}
		So(offsetTable[messageQueue], ShouldEqual, 11110)
	})

	Convey("test ResetOffset Body Decode empty fast json ", t, func() {
		body := "{\"offsetTable\":{}}"
		resetOffsetBody := new(ResetOffsetBody)
		resetOffsetBody.Decode([]byte(body))
		offsetTable := resetOffsetBody.OffsetTable
		So(offsetTable, ShouldNotBeNil)
		So(len(offsetTable), ShouldEqual, 0)
	})

	Convey("test ResetOffset Body Decode empty gson json ", t, func() {
		body := "{\"offsetTable\":[]}"
		resetOffsetBody := new(ResetOffsetBody)
		resetOffsetBody.Decode([]byte(body))
		offsetTable := resetOffsetBody.OffsetTable
		So(offsetTable, ShouldNotBeNil)
		So(len(offsetTable), ShouldEqual, 0)
	})
}
